# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from __future__ import unicode_literals

import os

import mozunit

from mozpack.copier import (
    FileCopier,
    FileRegistry,
)
from mozpack.manifests import (
    InstallManifest,
    UnreadableInstallManifest,
)
from mozpack.test.test_files import TestWithTmpDir


class TestInstallManifest(TestWithTmpDir):
    def test_construct(self):
        m = InstallManifest()
        self.assertEqual(len(m), 0)

    def test_malformed(self):
        f = self.tmppath('manifest')
        open(f, 'wb').write('junk\n')
        with self.assertRaises(UnreadableInstallManifest):
            m = InstallManifest(f)

    def test_adds(self):
        m = InstallManifest()
        m.add_symlink('s_source', 's_dest')
        m.add_copy('c_source', 'c_dest')
        m.add_required_exists('e_dest')
        m.add_optional_exists('o_dest')
        m.add_pattern_symlink('ps_base', 'ps/*', 'ps_dest')
        m.add_pattern_copy('pc_base', 'pc/**', 'pc_dest')
        m.add_preprocess('p_source', 'p_dest', 'p_source.pp')
        m.add_content('content', 'content')

        self.assertEqual(len(m), 8)
        self.assertIn('s_dest', m)
        self.assertIn('c_dest', m)
        self.assertIn('p_dest', m)
        self.assertIn('e_dest', m)
        self.assertIn('o_dest', m)
        self.assertIn('content', m)

        with self.assertRaises(ValueError):
            m.add_symlink('s_other', 's_dest')

        with self.assertRaises(ValueError):
            m.add_copy('c_other', 'c_dest')

        with self.assertRaises(ValueError):
            m.add_preprocess('p_other', 'p_dest', 'p_other.pp')

        with self.assertRaises(ValueError):
            m.add_required_exists('e_dest')

        with self.assertRaises(ValueError):
            m.add_optional_exists('o_dest')

        with self.assertRaises(ValueError):
            m.add_pattern_symlink('ps_base', 'ps/*', 'ps_dest')

        with self.assertRaises(ValueError):
            m.add_pattern_copy('pc_base', 'pc/**', 'pc_dest')

        with self.assertRaises(ValueError):
            m.add_content('content', 'content')

    def _get_test_manifest(self):
        m = InstallManifest()
        m.add_symlink(self.tmppath('s_source'), 's_dest')
        m.add_copy(self.tmppath('c_source'), 'c_dest')
        m.add_preprocess(self.tmppath('p_source'), 'p_dest', self.tmppath('p_source.pp'), '#', {'FOO':'BAR', 'BAZ':'QUX'})
        m.add_required_exists('e_dest')
        m.add_optional_exists('o_dest')
        m.add_pattern_symlink('ps_base', '*', 'ps_dest')
        m.add_pattern_copy('pc_base', '**', 'pc_dest')
        m.add_content('the content\non\nmultiple lines', 'content')

        return m

    def test_serialization(self):
        m = self._get_test_manifest()

        p = self.tmppath('m')
        m.write(path=p)
        self.assertTrue(os.path.isfile(p))

        with open(p, 'rb') as fh:
            c = fh.read()

        self.assertEqual(c.count('\n'), 9)

        lines = c.splitlines()
        self.assertEqual(len(lines), 9)

        self.assertEqual(lines[0], '5')

        m2 = InstallManifest(path=p)
        self.assertEqual(m, m2)
        p2 = self.tmppath('m2')
        m2.write(path=p2)

        with open(p2, 'rb') as fh:
            c2 = fh.read()

        self.assertEqual(c, c2)

    def test_populate_registry(self):
        m = self._get_test_manifest()
        r = FileRegistry()
        m.populate_registry(r)

        self.assertEqual(len(r), 6)
        self.assertEqual(r.paths(), ['c_dest', 'content', 'e_dest', 'o_dest',
                                     'p_dest', 's_dest'])

    def test_pattern_expansion(self):
        source = self.tmppath('source')
        os.mkdir(source)
        os.mkdir('%s/base' % source)
        os.mkdir('%s/base/foo' % source)

        with open('%s/base/foo/file1' % source, 'a'):
            pass

        with open('%s/base/foo/file2' % source, 'a'):
            pass

        m = InstallManifest()
        m.add_pattern_symlink('%s/base' % source, '**', 'dest')

        c = FileCopier()
        m.populate_registry(c)
        self.assertEqual(c.paths(), ['dest/foo/file1', 'dest/foo/file2'])

    def test_or(self):
        m1 = self._get_test_manifest()
        orig_length = len(m1)
        m2 = InstallManifest()
        m2.add_symlink('s_source2', 's_dest2')
        m2.add_copy('c_source2', 'c_dest2')

        m1 |= m2

        self.assertEqual(len(m2), 2)
        self.assertEqual(len(m1), orig_length + 2)

        self.assertIn('s_dest2', m1)
        self.assertIn('c_dest2', m1)

    def test_copier_application(self):
        dest = self.tmppath('dest')
        os.mkdir(dest)

        to_delete = self.tmppath('dest/to_delete')
        with open(to_delete, 'a'):
            pass

        with open(self.tmppath('s_source'), 'wt') as fh:
            fh.write('symlink!')

        with open(self.tmppath('c_source'), 'wt') as fh:
            fh.write('copy!')

        with open(self.tmppath('p_source'), 'wt') as fh:
            fh.write('#define FOO 1\npreprocess!')

        with open(self.tmppath('dest/e_dest'), 'a'):
            pass

        with open(self.tmppath('dest/o_dest'), 'a'):
            pass

        m = self._get_test_manifest()
        c = FileCopier()
        m.populate_registry(c)
        result = c.copy(dest)

        self.assertTrue(os.path.exists(self.tmppath('dest/s_dest')))
        self.assertTrue(os.path.exists(self.tmppath('dest/c_dest')))
        self.assertTrue(os.path.exists(self.tmppath('dest/p_dest')))
        self.assertTrue(os.path.exists(self.tmppath('dest/e_dest')))
        self.assertTrue(os.path.exists(self.tmppath('dest/o_dest')))
        self.assertTrue(os.path.exists(self.tmppath('dest/content')))
        self.assertFalse(os.path.exists(to_delete))

        with open(self.tmppath('dest/s_dest'), 'rt') as fh:
            self.assertEqual(fh.read(), 'symlink!')

        with open(self.tmppath('dest/c_dest'), 'rt') as fh:
            self.assertEqual(fh.read(), 'copy!')

        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
            self.assertEqual(fh.read(), 'preprocess!')

        self.assertEqual(result.updated_files, set(self.tmppath(p) for p in (
            'dest/s_dest', 'dest/c_dest', 'dest/p_dest', 'dest/content')))
        self.assertEqual(result.existing_files,
            set([self.tmppath('dest/e_dest'), self.tmppath('dest/o_dest')]))
        self.assertEqual(result.removed_files, {to_delete})
        self.assertEqual(result.removed_directories, set())

    def test_preprocessor(self):
        manifest = self.tmppath('m')
        deps = self.tmppath('m.pp')
        dest = self.tmppath('dest')
        include = self.tmppath('p_incl')

        with open(include, 'wt') as fh:
            fh.write('#define INCL\n')
        time = os.path.getmtime(include) - 3
        os.utime(include, (time, time))

        with open(self.tmppath('p_source'), 'wt') as fh:
            fh.write('#ifdef FOO\n#if BAZ == QUX\nPASS1\n#endif\n#endif\n')
            fh.write('#ifdef DEPTEST\nPASS2\n#endif\n')
            fh.write('#include p_incl\n#ifdef INCLTEST\nPASS3\n#endif\n')
        time = os.path.getmtime(self.tmppath('p_source')) - 3
        os.utime(self.tmppath('p_source'), (time, time))

        # Create and write a manifest with the preprocessed file, then apply it.
        # This should write out our preprocessed file.
        m = InstallManifest()
        m.add_preprocess(self.tmppath('p_source'), 'p_dest', deps, '#', {'FOO':'BAR', 'BAZ':'QUX'})
        m.write(path=manifest)

        m = InstallManifest(path=manifest)
        c = FileCopier()
        m.populate_registry(c)
        c.copy(dest)

        self.assertTrue(os.path.exists(self.tmppath('dest/p_dest')))

        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
            self.assertEqual(fh.read(), 'PASS1\n')

        # Create a second manifest with the preprocessed file, then apply it.
        # Since this manifest does not exist on the disk, there should not be a
        # dependency on it, and the preprocessed file should not be modified.
        m2 = InstallManifest()
        m2.add_preprocess(self.tmppath('p_source'), 'p_dest', deps, '#', {'DEPTEST':True})
        c = FileCopier()
        m2.populate_registry(c)
        result = c.copy(dest)

        self.assertFalse(self.tmppath('dest/p_dest') in result.updated_files)
        self.assertTrue(self.tmppath('dest/p_dest') in result.existing_files)

        # Write out the second manifest, then load it back in from the disk.
        # This should add the dependency on the manifest file, so our
        # preprocessed file should be regenerated with the new defines.
        # We also set the mtime on the destination file back, so it will be
        # older than the manifest file.
        m2.write(path=manifest)
        time = os.path.getmtime(manifest) - 1
        os.utime(self.tmppath('dest/p_dest'), (time, time))
        m2 = InstallManifest(path=manifest)
        c = FileCopier()
        m2.populate_registry(c)
        self.assertTrue(c.copy(dest))

        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
            self.assertEqual(fh.read(), 'PASS2\n')

        # Set the time on the manifest back, so it won't be picked up as
        # modified in the next test
        time = os.path.getmtime(manifest) - 1
        os.utime(manifest, (time, time))

        # Update the contents of a file included by the source file. This should
        # cause the destination to be regenerated.
        with open(include, 'wt') as fh:
            fh.write('#define INCLTEST\n')

        time = os.path.getmtime(include) - 1
        os.utime(self.tmppath('dest/p_dest'), (time, time))
        c = FileCopier()
        m2.populate_registry(c)
        self.assertTrue(c.copy(dest))

        with open(self.tmppath('dest/p_dest'), 'rt') as fh:
            self.assertEqual(fh.read(), 'PASS2\nPASS3\n')

    def test_preprocessor_dependencies(self):
        manifest = self.tmppath('m')
        deps = self.tmppath('m.pp')
        dest = self.tmppath('dest')
        source = self.tmppath('p_source')
        destfile = self.tmppath('dest/p_dest')
        include = self.tmppath('p_incl')
        os.mkdir(dest)

        with open(source, 'wt') as fh:
            fh.write('#define SRC\nSOURCE\n')
        time = os.path.getmtime(source) - 3
        os.utime(source, (time, time))

        with open(include, 'wt') as fh:
            fh.write('INCLUDE\n')
        time = os.path.getmtime(source) - 3
        os.utime(include, (time, time))

        # Create and write a manifest with the preprocessed file.
        m = InstallManifest()
        m.add_preprocess(source, 'p_dest', deps, '#', {'FOO':'BAR', 'BAZ':'QUX'})
        m.write(path=manifest)

        time = os.path.getmtime(source) - 5
        os.utime(manifest, (time, time))

        # Now read the manifest back in, and apply it. This should write out
        # our preprocessed file.
        m = InstallManifest(path=manifest)
        c = FileCopier()
        m.populate_registry(c)
        self.assertTrue(c.copy(dest))

        with open(destfile, 'rt') as fh:
            self.assertEqual(fh.read(), 'SOURCE\n')

        # Next, modify the source to #INCLUDE another file.
        with open(source, 'wt') as fh:
            fh.write('SOURCE\n#include p_incl\n')
        time = os.path.getmtime(source) - 1
        os.utime(destfile, (time, time))

        # Apply the manifest, and confirm that it also reads the newly included
        # file.
        m = InstallManifest(path=manifest)
        c = FileCopier()
        m.populate_registry(c)
        c.copy(dest)

        with open(destfile, 'rt') as fh:
            self.assertEqual(fh.read(), 'SOURCE\nINCLUDE\n')

        # Set the time on the source file back, so it won't be picked up as
        # modified in the next test.
        time = os.path.getmtime(source) - 1
        os.utime(source, (time, time))

        # Now, modify the include file (but not the original source).
        with open(include, 'wt') as fh:
            fh.write('INCLUDE MODIFIED\n')
        time = os.path.getmtime(include) - 1
        os.utime(destfile, (time, time))

        # Apply the manifest, and confirm that the change to the include file
        # is detected. That should cause the preprocessor to run again.
        m = InstallManifest(path=manifest)
        c = FileCopier()
        m.populate_registry(c)
        c.copy(dest)

        with open(destfile, 'rt') as fh:
            self.assertEqual(fh.read(), 'SOURCE\nINCLUDE MODIFIED\n')

        # ORing an InstallManifest should copy file dependencies
        m = InstallManifest()
        m |= InstallManifest(path=manifest)
        c = FileCopier()
        m.populate_registry(c)
        e = c._files['p_dest']
        self.assertEqual(e.extra_depends, [manifest])

if __name__ == '__main__':
    mozunit.main()
